package cn.doitedu.rtdw.data_sync;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @Author: deep as the sea
 * @Site: <a href="www.51doit.com">多易教育</a>
 * @QQ: 657270652
 * @Date: 2023/2/5
 * @Desc: 学大数据，到多易教育
 *  业务库 视频信息表 同步数据到  hbase
 **/
public class S02_SyncJob_VideoInfoTable2Hbase {

    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(2000);
        env.getCheckpointConfig().setCheckpointStorage("file:/d:/ckpt");
        env.setParallelism(1);
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        // 创建mysql中的源表的映射逻辑表(用cdc连接器)
        tEnv.executeSql("CREATE TABLE cms_video_source (   " +
                "      id INT,                                      " +
                "      video_name STRING,                           " +
                "      video_type STRING,                           " +
                "      video_album STRING,                          " +
                "      video_author STRING,                         " +
                "      video_timelong int,                          " +
                "      create_time TIMESTAMP(3),                    " +
                "      update_time TIMESTAMP(3),                    " +
                "     PRIMARY KEY (id) NOT ENFORCED            " +
                "     ) WITH (                                 " +
                "     'connector' = 'mysql-cdc',               " +
                "     'hostname' = 'doitedu'   ,               " +
                "     'port' = '3306'          ,               " +
                "     'username' = 'root'      ,               " +
                "     'password' = 'root'      ,               " +
                "     'database-name' = 'realtimedw',          " +
                "     'table-name' = 'cms_video'               " +
                ")");

        // 创建hbase中的目标表的逻辑映射表
        tEnv.executeSql(
                "CREATE TABLE cms_video_hbasesink( " +
                " id INT, " +
                " f ROW<video_name STRING, video_type STRING, video_album STRING, " +
                        "video_author STRING, video_timelong INT,create_time TIMESTAMP(3), update_time TIMESTAMP(3)>, " +
                " PRIMARY KEY (id) NOT ENFORCED " +
                ") WITH (                             " +
                " 'connector' = 'hbase-2.2',          " +
                " 'table-name' = 'dim_video_info',     " +
                " 'zookeeper.quorum' = 'doitedu:2181' " +
                ")");

        // 执行insert语句
        tEnv.executeSql(
                " insert into  cms_video_hbasesink                  "
                        +" select                                            "
                        +"   id,                                             "
                        +"   row(video_name, video_type, video_album,        "
                        +"      video_author , video_timelong ,create_time,  "
                        +"      update_time) as f                            "
                        +" from cms_video_source                             "
        );

    }
}
